package io.mqttpush.mqttserver.service;

import java.util.function.BiConsumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.mqttpush.mqttserver.beans.ConstantBean;
import io.mqttpush.mqttserver.beans.SendableMsg;
import io.mqttpush.mqttserver.beans.ServiceBeans;
import io.mqttpush.mqttserver.exception.SendException;
import io.mqttpush.mqttserver.exception.SendException.SendError;
import io.mqttpush.mqttserver.util.ByteBufEncodingUtil;
import io.mqttpush.mqttserver.util.StashMessage;
import io.mqttpush.mqttserver.util.thread.MyHashRunnable;
import io.mqttpush.mqttserver.util.thread.SingelThreadPool;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.Attribute;

/**
 * message 实际发送服务
 * 
 * @author tzj
 * 
 */
public class MessagePushService {

	final int retimes = 3;

	Logger logger=LoggerFactory.getLogger(getClass());

	TopicManager topicManager;

	SingelThreadPool singleThreadPool;

	ChannelUserService channelUserService;

	public MessagePushService() {

		ServiceBeans serviceBeans = ServiceBeans.getInstance();
		channelUserService = serviceBeans.getChannelUserService();
		topicManager = serviceBeans.getTopicService();
		singleThreadPool = serviceBeans.getSingleThreadPool();

	}

	/**
	 * 搜索知道主题下面所有设备发送
	 * 
	 * @param sendableMsg
	 */
	public void mutilSend(final SendableMsg sendableMsg) {

		BiConsumer<String, MqttQoS> consumer = (deviceId, mqttQos) -> {

			Runnable sendrunnable = () -> {
				sendableMsg.getMsgContent().retain();
				singleSendProcess(deviceId, mqttQos, sendableMsg);
			};
			singleThreadPool.execute(new MyHashRunnable(getClass(), deviceId, sendrunnable, 0));

		};

		topicManager.channelsAction(sendableMsg.getTopName(), consumer);
		

	}

	/**
	 * 单独给一个设备发送
	 * @param deviceId
	 * @param mqttQos
	 * @param sendableMsg
	 */
	public void  singleSendProcess(String deviceId, MqttQoS mqttQos,SendableMsg sendableMsg) {
		
		
		Channel channel = channelUserService.channel(deviceId);
		ChannelFuture channelFuture=null;
		
		if (sendableMsg.isRetain()) {
			sendableMsg.getByteForContent();
		}
		
		if(channel==null) {
			
			if (sendableMsg.isRetain()) {
				send2Admin(
						ByteBufEncodingUtil.getInatance().saveMQByteBuf(
								ByteBufAllocator.DEFAULT,System.currentTimeMillis(), 
								deviceId,
								Unpooled.wrappedBuffer(sendableMsg.getByteForContent())
						));
				logger.info("设备不在线"+deviceId+"将给admin");
			}
		
			
			
			return;
		}
		
		
		
		channelFuture=actualSend(sendableMsg,channel, mqttQos);
		
		
		if(channelFuture==null) {
			logger.warn(deviceId+"可能channel空了!");
			return;
		}
		
		/**
		 * 只要失败了就暂存消息
		 */
		channelFuture.addListener((ChannelFuture future) -> {

			/**
			 * 发送成功的话只要有保留标志就得发给admin
			 */
			if (sendableMsg.isRetain()) {
				send2Admin(
						ByteBufEncodingUtil.getInatance().saveMQByteBuf(
								ByteBufAllocator.DEFAULT,System.currentTimeMillis(), 
								deviceId,
								Unpooled.wrappedBuffer(sendableMsg.getByteForContent())
						));
				logger.info("发送消息"+deviceId+"将给admin");
			}
			
			if (!future.isSuccess()) {					
				logger.warn("发送失败", future.cause());
			}else {
				/**
				 * 发送成功的话回记录最后通话的对端
				 */
				channel.attr(ConstantBean.LASTSENT_DEVICEID).set(deviceId);
			}

		});
	}
	

	/**
	 * 实际发送
	 * 
	 * @param sendableMsg 发送的对象
	 * @param channel     接收方的channnel
	 * @param deviceId    发送放deviceid
	 * @return
	 */
	public ChannelFuture actualSend(SendableMsg sendableMsg, Channel channel, MqttQoS mqttQoS) {

		if (channel == null ) {
			return    null;
		}
		
		if(!channel.isActive()) {
			return channel.newFailedFuture(new SendException(SendError.CHANNEL_OFF));
		}

		if (sendableMsg.getDupTimes() > ConstantBean.MAX_ERROR_SENT) {
			/**
			 * 当超过最大次数以后就清楚了重发机制
			 */
			channel.attr(ConstantBean.UnConfirmedKey).set(null);
			logger.warn(channel+"连续发送不成功");
			return channel.newFailedFuture(new SendException(SendError.FAIL_MAX_COUNT));
		}

		ByteBuf sendBuf = sendableMsg.getMsgContent();
		if (sendBuf == null || !sendBuf.isReadable()) {
			return channel.newFailedFuture(new SendException(SendError.BUFF_FREED));
		}

		/**
		 * 保证通道里面只有一个待发消息。 如果有其他待发消息就发给admin 处理
		 */
		if (channel.hasAttr(ConstantBean.UnConfirmedKey)) {

		
			Attribute<SendableMsg> attribute = channel.attr(ConstantBean.UnConfirmedKey);
			if (attribute != null && (attribute.get()) != null) {
				/**
				 * 清楚原有的send对象
				 */
				logger.warn("覆盖以前的sendable");
				channel.attr(ConstantBean.UnConfirmedKey).set(null);
			}
		}

		
		if (mqttQoS == MqttQoS.EXACTLY_ONCE || mqttQoS == MqttQoS.AT_LEAST_ONCE) {
			sendableMsg.addDupTimes();
			channel.attr(ConstantBean.UnConfirmedKey).set(sendableMsg);

		}

		MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(
				MqttMessageType.PUBLISH, 
				sendableMsg.isDup(), 
				mqttQoS,sendableMsg.isRetain(), 0);

		MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(sendableMsg.getTopName(),
				sendableMsg.getMessageId());

		MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, variableHeader, sendBuf);

		ChannelFuture channelFuture = channel.writeAndFlush(mqttPublishMessage);

		return channelFuture;

	}

	/**
	 * 发送通知给给admin
	 * 
	 * @param payload
	 */
	public void send2Admin(ByteBuf payload) {

		/**
		 * 使用channelgroup 和 组播发送都可以发送给adminTopic
		 */

		MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE,
				false, 0);

		MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(ConstantBean.adminRecivTopic,
				payload.hashCode());

		MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, variableHeader, payload);

		topicManager.channelGroupWrite(ConstantBean.adminRecivTopic, mqttPublishMessage);

	}

}
